package com.xiaojiezhu.spark.rdd2

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 两个pair的负操作
  */
object Scala2PairAction {

  def main(args : Array[String]): Unit ={
    val conf = new SparkConf().setMaster("local").setAppName("app")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List((1,2),(3,4),(3,6)))
    val rdd2 = sc.parallelize(List((3,9)))

    //subtractByKey(rdd1,rdd2)
    //join(rdd1,rdd2)
    //rightOuterJoin(rdd1,rdd2)
    //leftOuterJoin(rdd1,rdd2)
    cogroup(rdd1,rdd2)
  }

  /**
    * 将两个rdd拥有有相同的键的数据分组在一起
    * @param rdd1
    * @param rdd2
    */
  def cogroup(rdd1 : RDD[(Int,Int)],rdd2 : RDD[(Int,Int)]): Unit ={
    val result = rdd1.cogroup(rdd2)
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 对两个rdd进行连接，确保第二个rdd的键必须存在，相当于数据库的left join，左外连接
    * @param rdd1
    * @param rdd2
    */
  def leftOuterJoin(rdd1 : RDD[(Int,Int)], rdd2 : RDD[(Int,Int)]): Unit ={
    val result = rdd1.leftOuterJoin(rdd2)
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 对两个rdd进行连接，确保第一个rdd的键必须存在,相当于数据库的right join，右外连接
    * @param rdd1
    * @param rdd2
    */
  def rightOuterJoin(rdd1 : RDD[(Int,Int)],rdd2 :  RDD[(Int,Int)]): Unit ={
    val result = rdd1.rightOuterJoin(rdd2)
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 对两个rdd进行内连接，会包含两个rdd中都拥有的key
    * @param rdd1
    * @param rdd2
    */
  def join(rdd1 : RDD[(Int,Int)],rdd2 : RDD[(Int,Int)]): Unit ={
    val result = rdd1.join(rdd2)
    result.foreach(x => println(x._1 + " , " + x._2))
  }

  /**
    * 删除rdd中包含另一个rdd的key的元素
    * @param rdd1
    * @param rdd2
    */
  def subtractByKey(rdd1 : RDD[(Int,Int)], rdd2 : RDD[(Int,Int)]): Unit ={
    val result = rdd1.subtractByKey(rdd2)
    result.foreach(x => println(x._1 + " , " + x._2))
  }
}
